package com.lelebd.szt.data;

import cn.hutool.core.io.FileUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.lelebd.szt.util.ParseCardNo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;

@Slf4j
public class SZTDataToKafka {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "139.198.108.148:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //acks=0时，producer不会等待确认;acks=1时，等待leader写到local log就行;acks=all或acks=-1时，等待isr中所有副本确认
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        String topic = "topic-hotitems";

        String SAVE_PATH = APIGetSZTData.class.getResource("/szt-data-page.jsons").getPath();
        List<String> lines = FileUtil.readUtf8Lines(SAVE_PATH);
        JSONArray data = null;
        JSONObject record = null;
        ProducerRecord<String, String> producerRecord = null;
        for (int i = 0, size = lines.size(); i < size; i++) {
            log.info("解析文本行号：{}/{} ", i, size);
            data = JSONObject.parseObject(lines.get(0)).getJSONArray("data");
            for (int j = 0, len = data.size(); j < len; j++) {
                try {
                    record = data.getJSONObject(i);
                    record.put("card_no", ParseCardNo.parse(record.getString("card_no")));
                    producerRecord = new ProducerRecord<>(topic, record.toJSONString());
                    Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
                    RecordMetadata metadata = future.get();
                    log.info("发送成功 topic={} partition={} offset={}", metadata.topic(),metadata.partition(),metadata.offset());
                } catch (IndexOutOfBoundsException e1) {
                    log.info("文本行line=[" + i + "] 解析异常： " + record);
                    e1.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
